[Enh]: Add Expr.map_batches to pyspark#3579
[Enh]: Add Expr.map_batches to pyspark#3579pedro-villanueva-bcom wants to merge 11 commits intonarwhals-dev:mainfrom
Expr.map_batches to pyspark#3579Conversation
c95fa6f to
e5b315f
Compare
|
|
|
I don't understand this test failure: https://github.com/narwhals-dev/narwhals/actions/runs/25050142610/job/73375481418?pr=3579 This test coverage test is also strange: https://github.com/narwhals-dev/narwhals/actions/runs/25050142614/job/73375481310?pr=3579 |
491bdd7 to
0d622c0
Compare
|
Hey @pedro-villanueva-bcom - thanks for taking the initiative! I am not sure we should support map_batches for lazy backends - I am open to see how this will play out! Regarding your questions:
I am not sure, but it would not be the first time that something is passing for pyspark but not for pyspark-connect.
Coverage is calculated with SQLFrame backend, so you will need to add a |
Any specific reason for this? In my mind (and use case) udfs are just another type of expression to create a column. It has performance implications for sure, but in my case there's no other choice (that's mostly statistical functions like getting a p-value from a column with a z score for example). My use case is a library of statistical functions for large datasets that works for pyspark, pyspark connect and snowpark. I want to make it work for in-memory backends too to deal with small data and make testing faster too. I discovered narwhals and I'm quite happy with it. The syntax is nice (nicer than ibis) and migrating is not super hard. |
Expr.map _batches to pysparkExpr.map_batches to pyspark
for more information, see https://pre-commit.ci
1572e5f to
00eaa83
Compare
|
thanks for the PR! looking at the docs
I'm also inclined to decline this feature i'm afraid, as it looks like a massive performance footgun Which statistical summaries are you looking to do? In any case, I think I'd suggest making a helper function within your code for this, I'm extremely hesitant to use anything from the pandas api in pyspark |
They are less performant that native expressions, but there's a reason pyspark (and snowpark, and all other backends I know of) allow for udfs: sometimes that's the only way to get something done. I think that the end user has to make the choices and tradeoffs, not the library. We can emit a warning like when using the pandas backend apply has to be called with a complex aggregation. Example udf: @udf(packages=["scipy"])
def norm_p_value(values): # type: ignore[no-untyped-def]
"""Normal distribution survival function (vectorized)."""
import scipy.stats as stats
if value is None:
return None
result = stats.norm.sf(value)
return result.item() if np.ndim(result) == 0 else result
Then I have to use it like tmp_input_col = f"__{col}_abs"
df = df.with_columns(nw.col(z_col).abs().alias(tmp_input_col))
df = norm_p_value(df, tmp_input_col, output_col)f.drop(tmp_input_col)Instead of simply which then I can keep chaining. Not having |
Description
Expr.map _batchescan be used when native expressions aren't enough, for example for statistical functions. Pyspark has several types of UDFs, including pandas UDFS that matches very well withmap _batches. This PR implementsmap _batchesusing pandas UDFs. The optional paramreturns_scalaris not supported, as pyspark doesn't allow this. UDF must return either a pandas Series, something that can be transformed into one, or a scalar that will be broadcast to one.The only change external to the spark backend is in the kind of the map_batches node, which has been changes from ordered to unordered.
Additionally, the testing fixture that creates the spark session now add the
PYSPARK_PYTHONenv var so that UDFs use that python to run (including using whatever packages are installed).What type of PR is this? (check all applicable)
Related issues
Checklist